查看原文
其他

Pulsar Functions 的深入小分享

🐟 StreamNative 2021-10-18

上周末由 Apache 龙为我们带来了 Pulsar Functions 相关的干货分享,整体内容基于 Pulsar Summit 2020 中 Splunk 工程师 Sanjeev Kulkarni 分享的《Pulsar Functions:A Deep Dive》。





Pulsar Functions 简单介绍


Pulsar Functions 是一个轻量级的计算框架,其主要目的是为了提供一个部署简单、运维简单的平台,主要突出“简单”使用的特点。


大概运行过程就是在 input topic 中接收消息,进入 functions 中进行运算和消息处理,最后输出到 output topic 中的过程。



Pulsar Functions 可以覆盖 90% 以上情况的流处理场景。比如:消息过滤、消息路由、消息增强等。更多使用场景可以参考之前发布的技术博客:基于 Pulsar Functions 的事件处理设计模式


Pulsar Functions 主要分为三大模块:Instance、Runtime、Function-worker。


在 Runtime 层面主要支持三种形式:thread、process 和外部支持的 Kubernetes。


更多关于 Pulsar Functions 介绍和基础层源码讲解,可以参考回放视频:01:00-13:20 时间段。





深入了解 Functions


此分享的解析角度,从 functions 内部几个重要节点出发。主要是:如何提交 Pulsar Functions、Functions Worker 如何调度以及如何运行 Pulsar Functions。


|| Submission Workflow

在执行/创建一个 function 时,需要通过 `FunctionConfig` 的形式暴露给用户,用户通过指定 `FunctionConfig` 来进行 functions 内部操作。


Functions 可以提交到任意 worker,通过相应的 Json 文件进行相应 tenant/namespace/name 等输入/输出配置的提供。


在配置构建完成后,会有一个 AuthN/AuthZ checks 过程,去检测在配置 function 过程中是否添加了与「加密」相关的设置。之后便会对 `FunctionConfig` 文件内格式以及其他方面进行再次核实。


最终这些 jar 包会存储在 BookKeeper 端,方便后续再次调用。


此时完成以上操作后,submission workflow 会把所有的 functions 提交到 MetaData Topic,并用 map from <FQFN, FunctionMetaData> 格式进行记录。


FQFN 就是 Fully Qualified Function Name,格式就是 tenant、namespace、functions name 三个字段拼合而成。


FQFN 作为存储元数据的 key,会把用户提供的 `FunctionConfig` 字段填充到 Function MetaData 中。图中的 MetaData Topic Tailer,主要目的是进行实时监测 MetaData Topic,根据实时更新变化写入等动态,进行后续操作。


在 Functions 内部没有真正开始执行「创建/更新/删除」等操作之前,需要进行状态更新。大体过程为:


  • 复制当前状态

  • 进行状态合并更新

  • 增加当前版本数

  • 将数据写入 MetaData Topic

  • Tailer 进行数据读取和验证

  • 如果没有冲突,则整个更新


上面的整个流程是在单台 function worker 的情况下,但是在多台 function worker 中就有可能会出现冲突

在多个 function worker 运行的场景下,当对某一个 function 进行并发更新时,会出现冲突的情况。当出现冲突时,其解决方式是使用 First Writer Win 的策略,即当第一个请求被成功接收之后,其它请求将会被拒绝。

经过以上过程的处理,我们可以看出在 submission workflow 过程中,functions 有优点也有缺点。比如可以提交到任意 worker、有固定的状态机。当然无限的数据增长却没有配置相关压缩数据的一些操作,着实有一丢丢可惜。

更多关于提交 Functions 的操作介绍,可以参考回放视频 13:30-37:00 时间段。

|| Scheduling workflow

当 function worker 有了上述元数据信息后,那么接下来将如何去调度整个流程呢?

Function worker 的整个调度过程都在 `IScheduler Interface` 接口中执行。


同时 Function worker 会在以下状态时开启「调度模式」。

  • CRUD 操作:创建/更新/删除

  • Worker 变动:如创建新 worker、leadership 发生变化等


虽然 function 可以提交到任意 worker 中,但是调度过程却只能在具有「leader」属性的 worker 中进行。


那么如何确认「谁是老板」呢?

在以前直播中我们也有专门讲过 Pulsar 消息的订阅模式,其中有一个是 Failover 模式。这里 Pulsar Functions 也借用了此模式。

当所有 worker 进入时,会去以 failover 模式订阅「Coordination Topic」。按照 failover 的规则,同一时间只会有一个活跃的「小朋友」成为「老板」。所以以此类推,上图的 worker2 也就成为了那一时间段的 leader。

有了 Leader Worker 后,就需要它进行数据的写入,将其写入到 Assignment Topic 中,用来记录调度信息。


更多关于调度流程的讲解,可以参考回放视频 37:00-45:00 时间段。

|| Execution Workflow

那么提交并调度完成后,整个 Functions 是如何运行起来的呢?


在上图中,Assignment Tailer 监听到 Topic 中的变化,此时就会将此动作变化传递给 Function RunTime Manager。同时借由 Spawner 进行一些列后续操作。

Spawner 是使用 Functions 时的一个抽象执行环境,也具有 Functions 生命周期管理的功能,同时通过 GRPC 通过进行与 Functions 的数据交互。

更多关于调度运行 Functions 的源码细节讲解,可以参考回放视频 45:00-52:00 时间段。





Future Work


未来关于 Pulsar Functions 的后续更新和产品方向,一个是会在现有功能上进行进一步的完善和改进,比如前边提到的 Function MetaData Topic 数据无限增长的情况,那么后续是否给其研发一个可压缩的功能;还有就是动态 RunTime selection 的扩充模式等。

另一个就是继续扩展出 Pulsar Functions 的其他新功能,如正在开发中的 Function Mesh。可以期待一下哦!




Q&A


Q:Pulsar functions 可以直接从第三方 key-value database 中读取数据,处理完后再将结果写入key-value database 吗?还是只能通过 Pulsar IO 先写入 Pulsar topic,Pulsar Functions 再从该 topic 读取数据做处理?

A:Pulsar Functions 是不支持的,可以借助 Pulsar IO 的功能去实现。就是按照问题后部分描述的方式进行。




总结


本次 TGIP-CN,我们也借由小龙的讲解,更进一步了解到 Pulsar Functions 的逻辑和运行过程,对于后续进行 Pulsar 本身扩展应用有了更清晰的认识。

更多细节参考下方回放视频:


本周末,我们将继续为大家带来关于 Apache Pulsar 如何应用在交互式查询中的分享。点击「阅读原文」可以进行直播报名。


: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存